package scala.org.zjt.spark.recommender

import java.io.PrintWriter

import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS, LogisticRegressionWithSGD}
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.regression.{LabeledPoint}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.Map

/**
  * 特征：  screen11 : 屏幕11英寸
  *
  *
  *
  * 稀疏向量： 利用稀疏向量对物体的全部中每个特征。0，1有无标识。
  *
  *           有限特征集：
  *               特征：属性+数值
  *                 有无特征（属性+数值）： 1 为有该特征   0 是没有该特征
  *
  *           无线特征集：
  *               特征：属性
  *                   0 为无该属性 ，非0为该属性值。
  *
  *
  *           每行数据 --> 稀疏向量：（总长度 ， 非零索引数组 ，非零value数组 ）
  *
  */
object Recommonder {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)


    //按照\t做分割切分，得到的是label和features字符串
    val data = sc.textFile("D:\\课程\\推荐系统\\资料\\000001_0").map(_.split("\t"))


    //如果用map得到的是RDD[Array[String]],flatmap可以压平，讲Array里面String释放出来，这里的map实际把:1去掉
    //去重得到特征的字典映射   得到：[screen11,screen110,device_id6........]
    val features: RDD[String] = data.flatMap(_.drop(1)(0).split(";")).map(_.split(":")(0)).distinct()


    //转成map为了后面得到稀疏向量非零下标用
    //加上索引,该索引代表该属性
    val dict: Map[String, Long] = features.zipWithIndex().collectAsMap()    //全向量


    //构建labelpoint,分label和vector两部分
    val traindata: RDD[LabeledPoint] = data.map(x => {
      //得到label，逻辑回归只支持0.0和1.0这里需要转换一下
      val label = x.take(1)(0) match {
        case "-1" => 0.0
        case "1" => 1.0
      }

      // 得到特征在集合中的索引。index
      val index: Array[Int] = x.drop(1)(0).split(";").map(_.split(":")(0)).map(
        fe => {
          val index: Long = dict.get(fe) match {
            case Some(n) => n
            case None => 0
          }
          index.toInt
        }
      )

      //创建一个所有元素是1.0的数组，作为稀疏向量非零元素集合      稀疏向量：（总长度 ， 非零索引数组 ，非零value数组 ）
      val vector = new SparseVector(dict.size, index, Array.fill(index.length)(1.0))

      //构建LabeledPoint
      new LabeledPoint(label, vector)
    })


    //模型训练，两个参数分别是迭代次数和步长
    val model: LogisticRegressionModel = new LogisticRegressionWithLBFGS().setNumClasses(10).run(traindata)

    // 废弃的函数： val model2: LogisticRegressionModel= LogisticRegressionWithSGD.train(traindata, 500, 0.01)
    // Run training algorithm to build the model
    // setNumClasses 设置分类的类别数量。默认的是二分类问题！   关联
    // val model2 = new LogisticRegressionWithLBFGS().setNumClasses(10).run(traindata)


    //得到权重
    val weights = model.weights.toArray


    //将原来的字典表反转，根据下标找对应的特征字符串
    val map: Map[Long, String] = dict.map(x => {
      (x._2, x._1)
    })

    val pw = new PrintWriter("D:\\课程\\推荐系统\\资料\\out");


    //输出： 特征和对象的权重
    for (i <- 0 until weights.length) {
      val feartureName = map.get(i) match {
        case Some(x) => x
        case None => ""
      }

      val result = feartureName + "\t" + weights(i)
      pw.write(result)
      pw.println()
    }
    pw.flush()
    pw.close()
  }
}
